At times you might need to monitor for data that has
been stored in SQL Server. Sometimes data needs to be migrated from one
place to another. If you need to transfer data from a database server to
a back-end database server, you have several options. One way is to
have your service monitor for the data and then query the records,
create a file with the data, and wait for another application to pick
that file up and process it. Another way is to have your service not
only pick up the data, but also process the data. I have already covered
submitting data to SQL Server, but now let’s create an example of
monitoring SQL Server for data to send to a simulated back-end server.
The goal should always be to try and optimize your service’s
interactions with the system and your data. In this case, because you
are monitoring a database table for data, it doesn’t make sense to have
more than one thread for this—you have to synchronize your data reads as
well as remember what data you have already queried.
Suppose you have a
database table full of customer data from a Web site. You want to move
this data to your mainframe. You have many options, including using Host
Integration Server or other transport layers. A service is a fairly
robust and simplistic solution. However, if you are copying records from
the table into a flat file or XML format for your mainframe or other
back-end server, you have to be able to keep track of records you have
already copied. You could have a shadow table in the database where you
copy records and they will be deleted when
the transaction is complete. Another solution is to have a bit field in
the database table that specifies whether the data has been copied out.
For our example we will use a simple query method to grab the data and
write it to our flat file. We will not be concerned in this section with
grabbing duplicate data.
Creating the Back-End Data Store
In this section we’ll
create a duplicate of the original Users table. However, we will do so
by creating a secondary database called BackEnd.
In Microsoft SQL Server, create a secondary database and name it BackEnd.
Next you want to create a table called Users,
using the exact structure with which you created the original Users
table in our Tutorials database, as well as the SaveUser stored
procedure. The only difference between the original and new configurations should be the name of the database.
Creating a New Connection String
Now that we have our data
store, we need to create a new connection string to use with our
current Users.dbml. Because these are identical objects we can reuse the
current implementation. We do, however, need to distinguish the
underlying UserDataContext object to connect to the appropriate data
store.
On the Properties tab of
the service, select Settings. You will see our current
TutorialsConnectionString. Add a new settings entry called BackEndConnectionString. Set the type to Connection String, set the level to Application, and then copy the TutorialsConnectionString value into our new entry. Change the database name in the copied value from Tutorials to BackEnd and close the Properties window. Save the service.
Updating the LINQSQL Class Connectionstring
Now that we have a new connection string we need to update our LINQSQL
class, which is currently set to only use the default connection
string. To do this, we add a data member of type UsersDataContext and
then modify our constructor to accept the connection string. Listing 1 shows the modified LINQSQL class.
Listing 1. Updating the LINQSQL class and constructor.
Public Class LINQSQL Private m_UDC As UsersDataContext
Public Sub New(ByVal ConnectionString As String) m_UDC = New UsersDataContext(ConnectionString) End Sub
|
Now
that we have our new data member, we have to update our methods to use
that new data member instead of the local variable instance we’ve been
using.
Updating InsertRecord Implementations
For both <InsertRecord>
implementations we want to update the code to use the class data member
instance. This step enhances our code because now it will not need to
initialize a new instance on each call. Listing 2 shows the new implementation of the code.
Listing 2. The new <InsertRecord> implementations.
Public Function InsertRecord( _ ByVal First As String, _ ByVal Last As String, _ ByVal Address1 As String, _ ByVal Address2 As String, _ ByVal City As String, _ ByVal State As String, _ ByVal Zip As String, _ ByVal phone As String, _ ByVal UserID As Guid _ ) As Boolean Try Dim newuser As User = New User()
newuser.UserID = UserID newuser.FirstName = First newuser.LastName = Last newuser.Address1 = Address1 newuser.Address2 = Address2 newuser.City = City newuser.State = State newuser.Zipcode = Zip newuser.Phone = phone
m_UDC.Users.Add(newuser) m_UDC.SubmitChanges()
Return True Catch sqlex As SqlClient.SqlException If sqlex.Number = 2627 Then 'It was a duplicate. Return True End If Catch ex As Exception Throw New Exception(ex.ToString) End Try End Function
Public Function InsertRecord(ByVal pszRecord As String) As Boolean Try Dim pszQuery As String = Nothing Dim pszVars() As String = Split(pszRecord, ",") Dim newuser As User = New User()
newuser.FirstName = pszVars(0) newuser.LastName = pszVars(1) newuser.Address1 = pszVars(2) newuser.Address2 = pszVars(3) newuser.City = pszVars(4) newuser.State = pszVars(5) newuser.Zipcode = pszVars(6) newuser.Phone = pszVars(7) newuser.UserID = Guid.NewGuid()
m_UDC.Users.Add(newuser) m_UDC.SubmitChanges()
Return True Catch sqlex As SqlClient.SqlException If sqlex.Number = 2627 Then 'It was a duplicate. Return True End If Catch ex As Exception Throw New Exception(ex.ToString) End Try End Function
|
These
changes are important because we will now connect to a secondary
database implementation that uses the same format as our source
database.
Implementing a Dispose Method for the LINQSQL Class
Now that we are using a class member, we want to make sure we can also clean it up, so we add the method shown in Listing 3.
Listing 3. Implementing the <Dispose> method.
Public Sub Dispose() Try m_UDC.Dispose() Catch ex As Exception m_UDC = Nothing End Try End Sub
|
We will need to also
update any instance of this object created in any external code to call
this method. In our case, we will want to call this in our FileWorker class.
Updating the <FileWorker.ProcessFiles> Method
Now that we are required to pass in the connection string, we need to update the <ProcessFiles> method to reflect this change, as shown in Listing 4.
Listing 4. Updating the call to <InsertRecord> from <ProcessFiles>.
Private Sub ProcessFiles() Dim LinqSql As LINQSQL = New LINQSQL(My.Settings.TutorialsConnectionString)
While Not m_ThreadAction.StopThread If Not m_ThreadAction.Pause Then Try For Each TextFile As String In My.Computer.FileSystem.GetFiles( _ m_FileWorkerOptions.Output, _ FileIO.SearchOption.SearchTopLevelOnly, _ m_FileWorkerOptions.FileType) If m_ThreadAction.Pause Or m_ThreadAction.StopThread Then Exit For End If Try Dim tmpGuid As String = Guid.NewGuid().ToString()
Dim ProcessFile As String = m_FileWorkerOptions.ProcessedPath _ + "\" + tmpGuid + "_" + My.Computer.FileSystem.GetName(TextFile)
'File is moved so lets read it out of the Output Folder If (Me.MailEnabled) Then Dim message As String Try message = "Processing File Data:[" + _ My.Computer.FileSystem.ReadAllText(TextFile) + _"]] From File - " Catch ex As Exception message = "Unable to read from file - " End Try 'Send the Email and then move it again to the processed Folder m_FileWorkerOptions.EmailProperties.Message = message m_SmtpClient.QueueMail(m_FileWorkerOptions.EmailProperties) End If Dim records As StreamReader = _ My.Computer.FileSystem.OpenTextFileReader(TextFile) Dim record As String record = records.ReadLine() While (Not record Is Nothing) Try Dim insert As Boolean = LinqSql.InsertRecord(record)
If Not insert Then 'Save it System.Threading.ThreadPool.QueueUserWorkItem( _ New WaitCallback(AddressOf ProcessFailures), record) End If Catch ex As Exception 'Save it System.Threading.ThreadPool.QueueUserWorkItem( _ New WaitCallback(AddressOf ProcessFailures), _ record + "-" + ex.ToString) WriteLogEvent(My.Resources.ThreadIOError + "_" + ex.ToString _ + "_" + Now.ToString, THREAD_ERROR, EventLogEntryType. Error, _ My.Resources.Source) End Try
record = records.ReadLine End While
records.Close() records.Dispose() My.Computer.FileSystem.MoveFile(TextFile, ProcessFile, True) System.Threading.Thread.Sleep(0) WriteLogEvent(My.Resources.ThreadMessage + _ TextFile, THREAD_INFO, EventLogEntryType.Information, _ My.Resources.Source) Catch ex As Exception WriteLogEvent(My.Resources.ThreadErrorMessage + _ "_" + ex.ToString + "_" + Now.ToString, THREAD_ERROR, _ EventLogEntryType.Error, My.Resources.Source) End Try Next Catch fio As IOException WriteLogEvent(My.Resources.ThreadIOError + "_" + fio.ToString _ + "_" + Now.ToString, THREAD_ABORT_ERROR, _ EventLogEntryType.Error, My.Resources.Source) Catch tab As ThreadAbortException 'this must be listed first as Exception is the master catch 'Clean up thread here WriteLogEvent(My.Resources.ThreadAbortMessage + "_" _ + tab.ToString + "_" + Now.ToString, THREAD_ABORT_ERROR, _ EventLogEntryType.Error, My.Resources.Source) Catch ex As Exception WriteLogEvent(My.Resources.ThreadErrorMessage + "_" + _ ex.ToString + "_" + Now.ToString, THREAD_ERROR, _ EventLogEntryType.Error, My.Resources.Source) End Try End If If Not m_ThreadAction.StopThread Then Thread.Sleep(THREAD_WAIT) End If End While End Sub
|
I’m updating this method
because in this demonstration we want to emulate a situation in which
newly created records are being inserted into a source database and then
migrated to our back-end data store.
Creating a New <ProcessRecords> Method
At
this point we can insert records into our source database; now we need
to add a method that can then process these records by moving them to
our back-end store. Of course in this example we could simply add the
same record to our primary and back-end stores. If the situation called
for it—if we were to get in a single record with a multi-destination
requirement—that solution makes sense. But in our case, we want to show a
migration of the data. We are emulating a circumstance in which the
record may already exist, or going directly from the source to the
destination is not possible. Listing 5 shows the new <ProcessRecords> method.
Listing 5. Updated <ProcessRecords> with support for data migration.
Private Sub ProcessRecords() While Not (m_ThreadAction.StopThread) If Not (m_ThreadAction.Pause) Then Try Dim source As LINQSQL = New _ LINQSQL(My.Settings.TutorialsConnectionString)
Dim destination As LINQSQL = New _ LINQSQL(My.Settings.BackEndConnectionString)
Dim AllUsers() As User = source.GetUsers
For Each user In AllUsers Try destination.InsertRecord(user.FirstName, user.LastName, _ user.Address1, user.Address2, user.City, user.State, _ user.Zipcode, user.Phone, user.UserID) Catch ex As Exception WriteLogEvent(My.Resources.ThreadErrorMessage + ex.ToString, _ THREAD_ERROR, EventLogEntryType.Error, My.Resources.Source) End Try Next
source.Dispose() destination.Dispose() Catch ex As Exception WriteLogEvent(My.Resources.ThreadErrorMessage + ex.ToString, _ THREAD_ERROR, EventLogEntryType.Error, My.Resources.Source) End Try End If If Not m_ThreadAction.StopThread Then Thread.Sleep(THREAD_WAIT) End If End While End Sub
|
As with other methods, we
are going to loop, looking for new records to migrate, while also
validating whether we are supposed to return or pause.
We are also creating two instances of our LINQSQL
class, one for each data store we are connecting to. Our source
instance is where we will query our records from, and our destination is
where we will migrate to.
Note two things about this new method:
We are using our
source instance to query the records and then looping through the
current users. This is a new method, which I will demonstrate shortly.
I am looping through all the instances of the source and then calling the <InsertRecord> method on the destination.
Now that we have our ProcessRecords method, I need to show the changes to the LINQSQL class so that we can return the records we need to query from our source.
Updating the FileWorker Class
We need to add thread support for the new method in Listing 7-18. We need to add a new thread definition to our FileWorker class, a new property that we will use to expose this new thread to the service, and an update to the <Start> method, as in the code in Listing 6.
Listing 6. Updated FileWorker class with <CopyRecords> method thread support.
Private m_CopyRecords As Thread = Nothing
Public Sub Start() m_Incoming = New Thread(AddressOf ProcessIncoming) m_Incoming.Priority = ThreadPriority.Normal m_Incoming.IsBackground = True m_Incoming.Start()
m_Outgoing = New Thread(AddressOf ProcessFiles) m_Outgoing.Priority = ThreadPriority.Normal m_Outgoing.IsBackground = True m_Outgoing.Start()
m_CopyRecords = New Thread(AddressOf ProcessRecords) m_CopyRecords.Priority = ThreadPriority.Normal m_CopyRecords.IsBackground = True m_CopyRecords.Start() End Sub
Public ReadOnly Property CopyRecords() As Thread Get Return m_CopyRecords End Get End Property
|
Each piece of code in Listing 6 reflects a change to the current FileWorker implementation. The <Start> method has been updated to utilize the newly created m_CopyThreads thread variable to run an instance of the <CopyRecords> method.
With the changes in Listing 6, we need to make an update to the Tutorials.vb class to clean up the newly created FileWorker thread.
Listing 7 shows the updates to the <Tutorials.OnStop> method.
Listing 7. Updated Tutorials.vb <OnStop> with FileWorker CopyRecords thread cleanup support.
Protected Overrides Sub OnStop() ' Add code here to perform any tear-down necessary to stop your service. Try If (Not m_WorkerThread Is Nothing) Then Try WriteLogEvent(My.Resources.ServiceStopping, ONSTOP_INFO, _ EventLogEntryType.Information, My.Resources.Source)
m_ThreadAction.StopThread = True
For Each fw As FileWorker In m_WorkerThreads Me.RequestAdditionalTime(THIRTY_SECONDS) fw.Incoming.Join(TIME_OUT)
Me.RequestAdditionalTime(THIRTY_SECONDS) fw.Outgoing.Join(TIME_OUT)
Me.RequestAdditionalTime(THIRTY_SECONDS)
fw.CopyRecords.Join(TIME_OUT)
Next Catch ex As Exception m_WorkerThread = Nothing End Try End If Catch ex As Exception 'We Catch the Exception 'to avoid any unhandled errors 'since we are stopping and 'logging an event is what failed 'we will merely write the output 'to the debug window m_WorkerThread = Nothing Debug.WriteLine("Error stopping service: " + ex.ToString()) End Try End Sub
|
The updated code shuts
down the CopyRecords thread gracefully when a request to shut down the
service is generated by the system or a user.
Creating the New <LINQSQL.GetUsers> Method
We need to be able to
easily query our source records in a reusable manner. For this reason I
have implemented the code shown in Listing 8.
Listing 8. <GetUsers> data-retrieval implementation.
Public Function GetUsers() As User() Try Dim AllUsers = (From ausers In m_UDC.Users Select ausers).ToArray
Return AllUsers Catch ex As Exception Throw New Exception(ex.ToString) End Try End Function
|
Understanding
LINQ syntax will help clarify the point of this query. LINQ, which is
designed for generic source data retrieval, supports a syntax similar to
T-SQL. However, note that the order of the query and the keywords are
reversed.
Another important thing
to note is that by default you don’t define the object type you are
querying in your declaration. Nor do you define an array collection.
Instead, you query the records into a variable instance and then use a
For Each loop to iterate through the enumeration that is created for you
automatically. However, in this case I want the iteration to occur in
the calling method and not from the source method. To make this work, I
tell the LINQ generator that I want to create an Array by wrapping the
query, which occurs when the line is encountered, in a ToArray cast.
This allows me to pass an array of User types back to the caller, where I
am iterating through the collection as defined in the <ProcessRecords> method.
Install and Verify
Compile and install
the service. Remember to place some records in your incoming folder if
you don’t have any records in your source database.
Note
You will actually be inserting the same records over and over into your destination data store. |
One way to resolve
duplicating records is to place a unique primary key constraint on your
destination table preventing insertion of duplicate records based on
that key. The uniqueness should be based on a combination of columns you
choose, such as First, Last, and Address.
The flaw in this solution
is that even though we won’t get any duplicates, our query will always
pull back every record in our source table. The more records that we
return, the more processing time it will take and the more resources
over time are needed to pull back all the unprocessed and already
processed records.